package com.wsjj.yjh.watermark;

import com.wsjj.yjh.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
//        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        DataStreamSource<String> source = env.socketTextStream("hadoop102", 7777);
        SingleOutputStreamOperator<WaterSensor> map = source.map(value -> {
            String[] split = value.split(",");
            return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.parseInt(split[2]));
        });

        SingleOutputStreamOperator<WaterSensor> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        System.out.println(element.getTs() );
                        return element.getTs() * 1000L ;
                    }
                }));

        KeyedStream<WaterSensor, String> keyedStream = watermarks.keyBy(WaterSensor::getId);

        OutputTag<WaterSensor> waterSensorOutputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        WindowedStream<WaterSensor, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(waterSensorOutputTag);

        SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                String start = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss");
                String end = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss");
                System.out.println(start);
                System.out.println(end);
                long count = elements.spliterator().estimateSize();
                out.collect("key=" + s + "的窗口[" + start + "," + end + ")包含" + count + "条数据===>" + elements.toString());
            }
        });

        process.print("正常数据");
        process.getSideOutput(waterSensorOutputTag).printToErr();

        env.execute();
    }
}
